1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.util;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertTrue;
20
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.junit.Test;
29
30 import rx.Scheduler;
31 import rx.Subscription;
32 import rx.functions.Action0;
33 import rx.functions.Func1;
34 import rx.schedulers.Schedulers;
35
36 public class IndexedRingBufferTest {
37
38 @Test
39 public void add() {
40 IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
41 list.add(new LSubscription(1));
42 list.add(new LSubscription(2));
43 final AtomicInteger c = new AtomicInteger();
44
45 list.forEach(newCounterAction(c));
46 assertEquals(2, c.get());
47 }
48
49 @Test
50 public void removeEnd() {
51 IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
52 list.add(new LSubscription(1));
53 int n2 = list.add(new LSubscription(2));
54
55 final AtomicInteger c = new AtomicInteger();
56 list.forEach(newCounterAction(c));
57 assertEquals(2, c.get());
58
59 list.remove(n2);
60
61 final AtomicInteger c2 = new AtomicInteger();
62 list.forEach(newCounterAction(c2));
63 assertEquals(1, c2.get());
64 }
65
66 @Test
67 public void removeMiddle() {
68 IndexedRingBuffer<LSubscription> list = IndexedRingBuffer.getInstance();
69 list.add(new LSubscription(1));
70 int n2 = list.add(new LSubscription(2));
71 list.add(new LSubscription(3));
72
73 list.remove(n2);
74
75 final AtomicInteger c = new AtomicInteger();
76 list.forEach(newCounterAction(c));
77 assertEquals(2, c.get());
78 }
79
80 @Test
81 public void addRemoveAdd() {
82 IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
83 list.add("one");
84 list.add("two");
85 list.add("three");
86 ArrayList<String> values = new ArrayList<String>();
87 list.forEach(accumulate(values));
88 assertEquals(3, values.size());
89 assertEquals("one", values.get(0));
90 assertEquals("two", values.get(1));
91 assertEquals("three", values.get(2));
92
93 list.remove(1);
94
95 values.clear();
96 list.forEach(accumulate(values));
97 assertEquals(2, values.size());
98 assertEquals("one", values.get(0));
99 assertEquals("three", values.get(1));
100
101 list.add("four");
102
103 values.clear();
104 list.forEach(accumulate(values));
105 assertEquals(3, values.size());
106 assertEquals("one", values.get(0));
107 assertEquals("four", values.get(1));
108 assertEquals("three", values.get(2));
109
110 final AtomicInteger c = new AtomicInteger();
111 list.forEach(newCounterAction(c));
112 assertEquals(3, c.get());
113 }
114
115 @Test
116 public void addThousands() {
117 String s = "s";
118 IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
119 for (int i = 0; i < 10000; i++) {
120 list.add(s);
121 }
122 AtomicInteger c = new AtomicInteger();
123 list.forEach(newCounterAction(c));
124 assertEquals(10000, c.get());
125
126 list.remove(5000);
127 c.set(0);
128 list.forEach(newCounterAction(c));
129 assertEquals(9999, c.get());
130
131 list.add("one");
132 list.add("two");
133 c.set(0);
134
135
136
137 list.forEach(newCounterAction(c));
138 assertEquals(10001, c.get());
139 }
140
141 @Test
142 public void testForEachWithIndex() {
143 IndexedRingBuffer<String> buffer = IndexedRingBuffer.getInstance();
144 buffer.add("zero");
145 buffer.add("one");
146 buffer.add("two");
147 buffer.add("three");
148
149 final ArrayList<String> list = new ArrayList<String>();
150 int nextIndex = buffer.forEach(accumulate(list));
151 assertEquals(4, list.size());
152 assertEquals(list, Arrays.asList("zero", "one", "two", "three"));
153 assertEquals(0, nextIndex);
154
155 list.clear();
156 nextIndex = buffer.forEach(accumulate(list), 0);
157 assertEquals(4, list.size());
158 assertEquals(list, Arrays.asList("zero", "one", "two", "three"));
159 assertEquals(0, nextIndex);
160
161 list.clear();
162 nextIndex = buffer.forEach(accumulate(list), 2);
163 assertEquals(4, list.size());
164 assertEquals(list, Arrays.asList("two", "three", "zero", "one"));
165 assertEquals(2, nextIndex);
166
167 list.clear();
168 nextIndex = buffer.forEach(accumulate(list), 3);
169 assertEquals(4, list.size());
170 assertEquals(list, Arrays.asList("three", "zero", "one", "two"));
171 assertEquals(3, nextIndex);
172
173 list.clear();
174 nextIndex = buffer.forEach(new Func1<String, Boolean>() {
175
176 @Override
177 public Boolean call(String t1) {
178 list.add(t1);
179 return false;
180 }
181
182 }, 3);
183 assertEquals(1, list.size());
184 assertEquals(list, Arrays.asList("three"));
185 assertEquals(3, nextIndex);
186
187 list.clear();
188 nextIndex = buffer.forEach(new Func1<String, Boolean>() {
189 int i = 0;
190
191 @Override
192 public Boolean call(String t1) {
193 list.add(t1);
194 if (i++ == 2) {
195 return false;
196 } else {
197 return true;
198 }
199 }
200
201 }, 0);
202 assertEquals(3, list.size());
203 assertEquals(list, Arrays.asList("zero", "one", "two"));
204 assertEquals(2, nextIndex);
205 }
206
207 @Test
208 public void testForEachAcrossSections() {
209 IndexedRingBuffer<Integer> buffer = IndexedRingBuffer.getInstance();
210 for (int i = 0; i < 10000; i++) {
211 buffer.add(i);
212 }
213
214 final ArrayList<Integer> list = new ArrayList<Integer>();
215 int nextIndex = buffer.forEach(accumulate(list), 5000);
216 assertEquals(10000, list.size());
217 assertEquals(Integer.valueOf(5000), list.get(0));
218 assertEquals(Integer.valueOf(9999), list.get(4999));
219 assertEquals(Integer.valueOf(0), list.get(5000));
220 assertEquals(Integer.valueOf(4999), list.get(9999));
221 assertEquals(5000, nextIndex);
222 }
223
224 @Test
225 public void longRunningAddRemoveAddDoesntLeakMemory() {
226 String s = "s";
227 IndexedRingBuffer<String> list = IndexedRingBuffer.getInstance();
228 for (int i = 0; i < 20000; i++) {
229 int index = list.add(s);
230 list.remove(index);
231 }
232
233 AtomicInteger c = new AtomicInteger();
234 list.forEach(newCounterAction(c));
235 assertEquals(0, c.get());
236
237 assertTrue(list.index.get() < IndexedRingBuffer.SIZE);
238
239 assertEquals(1, list.index.get());
240 }
241
242 @Test
243 public void testConcurrentAdds() throws InterruptedException {
244 final IndexedRingBuffer<Integer> list = IndexedRingBuffer.getInstance();
245
246 Scheduler.Worker w1 = Schedulers.computation().createWorker();
247 Scheduler.Worker w2 = Schedulers.computation().createWorker();
248
249 final CountDownLatch latch = new CountDownLatch(2);
250
251 w1.schedule(new Action0() {
252
253 @Override
254 public void call() {
255 for (int i = 0; i < 10000; i++) {
256 list.add(i);
257 }
258 latch.countDown();
259 }
260
261 });
262 w2.schedule(new Action0() {
263
264 @Override
265 public void call() {
266 for (int i = 10000; i < 20000; i++) {
267 list.add(i);
268 }
269 latch.countDown();
270 }
271
272 });
273
274 latch.await();
275
276 w1.unsubscribe();
277 w2.unsubscribe();
278
279 AtomicInteger c = new AtomicInteger();
280 list.forEach(newCounterAction(c));
281 assertEquals(20000, c.get());
282
283 ArrayList<Integer> values = new ArrayList<Integer>();
284 list.forEach(accumulate(values));
285 Collections.sort(values);
286 int j = 0;
287 for (int i : values) {
288 assertEquals(i, j++);
289 }
290 }
291
292 @Test
293 public void testConcurrentAddAndRemoves() throws InterruptedException {
294 final IndexedRingBuffer<Integer> list = IndexedRingBuffer.getInstance();
295
296 final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
297
298 Scheduler.Worker w1 = Schedulers.computation().createWorker();
299 Scheduler.Worker w2 = Schedulers.computation().createWorker();
300
301 final CountDownLatch latch = new CountDownLatch(2);
302
303 w1.schedule(new Action0() {
304
305 @Override
306 public void call() {
307 try {
308 for (int i = 10000; i < 20000; i++) {
309 list.add(i);
310
311 }
312 } catch (Exception e) {
313 e.printStackTrace();
314 exceptions.add(e);
315 }
316 latch.countDown();
317 }
318
319 });
320
321 w2.schedule(new Action0() {
322
323 @Override
324 public void call() {
325 try {
326 for (int i = 0; i < 10000; i++) {
327 int index = list.add(i);
328
329 Integer v = list.remove(index);
330 if (v == null) {
331 throw new RuntimeException("should not get null");
332 }
333 list.add(v);
334 }
335 } catch (Exception e) {
336 e.printStackTrace();
337 exceptions.add(e);
338 }
339 latch.countDown();
340 }
341
342 });
343
344 latch.await();
345
346 w1.unsubscribe();
347 w2.unsubscribe();
348
349 AtomicInteger c = new AtomicInteger();
350 list.forEach(newCounterAction(c));
351 assertEquals(20000, c.get());
352
353 ArrayList<Integer> values = new ArrayList<Integer>();
354 list.forEach(accumulate(values));
355 Collections.sort(values);
356 int j = 0;
357 for (int i : values) {
358 assertEquals(i, j++);
359 }
360
361 if (exceptions.size() > 0) {
362 System.out.println("Exceptions: " + exceptions);
363 }
364 assertEquals(0, exceptions.size());
365 }
366
367 private <T> Func1<T, Boolean> accumulate(final ArrayList<T> list) {
368 return new Func1<T, Boolean>() {
369
370 @Override
371 public Boolean call(T t1) {
372 list.add(t1);
373 return true;
374 }
375
376 };
377 }
378
379 @SuppressWarnings("unused")
380 private Func1<Object, Boolean> print() {
381 return new Func1<Object, Boolean>() {
382
383 @Override
384 public Boolean call(Object t1) {
385 System.out.println("Object: " + t1);
386 return true;
387 }
388
389 };
390 }
391
392 private Func1<Object, Boolean> newCounterAction(final AtomicInteger c) {
393 return new Func1<Object, Boolean>() {
394
395 @Override
396 public Boolean call(Object t1) {
397 c.incrementAndGet();
398 return true;
399 }
400
401 };
402 }
403
404 public static class LSubscription implements Subscription {
405
406 private final int n;
407
408 public LSubscription(int n) {
409 this.n = n;
410 }
411
412 @Override
413 public void unsubscribe() {
414
415 }
416
417 @Override
418 public boolean isUnsubscribed() {
419 return false;
420 }
421
422 @Override
423 public String toString() {
424 return "Subscription=>" + n;
425 }
426 }
427 }